import os
import warnings
warnings.filterwarnings("ignore")
import itertools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
# import statsmodels.api as sm
import matplotlib
matplotlib.rcParams['axes.labelsize'] = 14
matplotlib.rcParams['xtick.labelsize'] = 12
matplotlib.rcParams['ytick.labelsize'] = 12
matplotlib.rcParams['text.color'] = 'k'
from bokeh.io import output_notebook, show
output_notebook()
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import HoverTool
import ruptures as rpt
!pip install ruptures==1.0.1
prefix_path = 'interval_data/'
file_names = sorted(os.listdir(prefix_path))
file_names
nmi = pd.read_csv('nmi_info.csv')
holidays = pd.read_csv('holidays.csv')
merged = nmi.merge(holidays, on='State')\
.drop(['OutInterval', 'State'], axis=1)\
.drop_duplicates()
nmi_with_holiday = merged.sort_values(['Nmi', 'LocalDate']).reset_index().drop('index',1)
# pick one to test
nmi_with_holiday[nmi_with_holiday['Nmi'] == 'NMIA1']
df = pd.read_csv(prefix_path + file_names[3])
energy_df = df
for name in file_names:
df = pd.read_csv(prefix_path + name)
e_null = df['E'].isnull().sum()
time_null = df['AESTTime'].isnull().sum()
print(f"\n{name}:")
print(f"{time_null} missing TimeStamp")
print(f"{e_null} missing E")
1416 nulls in NMIA3.csv 1416 nulls in NMIS3.csv
for name in file_names:
df = pd.read_csv(prefix_path + name)
print(f"{name} size: {df.shape} from: {df.AESTTime.min()} to: {df.AESTTime.max()}")
NMIA2 = pd.read_csv(prefix_path + 'NMIA2.csv')
print(pd.DatetimeIndex(NMIA2['AESTTime']).shape)
NMIA2 = NMIA2.set_index(pd.DatetimeIndex(NMIA2['AESTTime']))\
.drop(['AESTTime'], axis=1)
NMIA2.head()
df = pd.read_csv(prefix_path + 'NMIA3.csv')
a3_nans = df.loc[df['E'].isnull()]
null_percentage = df['E'].isnull().sum() / df.shape[0]
print(f"null_percentage: {null_percentage * 100}%")
df = pd.read_csv(prefix_path + 'NMIS3.csv')
s3_nans = df.loc[df['E'].isnull()]
null_percentage = df['E'].isnull().sum() / df.shape[0]
print(f"null_percentage: {null_percentage * 100}%")
a3_nans[['AESTTime']]
from bokeh.plotting import figure, output_file, show
from bokeh.layouts import row
from numpy import histogram
it = iter(sorted(file_names))
for l, r in list(zip(it, it)):
ldf = pd.read_csv(prefix_path + l)
ldf = ldf.fillna(0)
rdf = pd.read_csv(prefix_path + r)
rdf = rdf.fillna(0)
lp = figure(title=l[3:5], plot_height=300, plot_width=480)
rp = figure(title=r[3:5], plot_height=300, plot_width=480)
hist, edges = histogram(ldf.E, density=True, bins=50)
hist2, edges2 = histogram(rdf.E, density=True, bins=50)
lp.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], line_color="white")
rp.quad(top=hist2, bottom=0, left=edges2[:-1], right=edges2[1:], line_color="white")
show(row(lp, rp))
file_map = {}
for name in file_names:
df = pd.read_csv(prefix_path + name)
# NaN
df.E = df.E.interpolate('nearest')
# df = df.fillna(0)
# index
df = df.set_index(pd.DatetimeIndex(df['AESTTime'])).drop(['AESTTime'], axis=1)
# aggregate by 30 mins interval only
df = df.resample('30T').mean()
assert df.shape[0] == 17520
print('\n' + name + ' ====>')
print(df.shape)
print(df.head())
file_map[name] = df
file_map.keys()
for k, df in file_map.items():
print(f'asserting unique index number (17520) + no null column: {k}')
# no null
assert df['E'].isnull().sum() == 0
# unique index
assert len(df.index.unique()) == 17520
df.loc[df['E'].isnull()]
# set index
# df = df.set_index(pd.DatetimeIndex(df['AESTTime']))
testdf = file_map['NMIG1.csv']
testdf.head()
# testdf = file_map['NMIG1.csv']
# testdf.loc['2018-08-30':'2018-08-31'].head()
days = 30
y = testdf['E'][:48 * days]
y.head()
# simple x axis labels
# xlabels = list(map(lambda x: x.split(' ')[1][:5], y.index))
# xlabels[:10]
y.plot(
figsize=(15, 6),
# rot=90,
title='NMIG1'
)
# plt.xticks(np.arange(len(y.index)), xlabels)
# plt.show()
# series.groupby(series.index.hour).mean()
# series.groupby(pd.Grouper(freq='30min')).mean()
# from pandas import Series
# from pandas import DataFrame
# from pandas import Grouper
# from matplotlib import pyplot
# series = df['E']
# groups = series.groupby(pd.Grouper(freq='1h'))
# years = DataFrame()
# for k, v in groups:
# years[k] = 1 #v.values.mean()
# years
# years = years.T
# pyplot.matshow(years, interpolation=None, aspect='auto')
# pyplot.show()
for file_name, df in file_map.items():
# cast to date
# df['AESTTime'] = pd.to_datetime(df['AESTTime'])
TOOLS = "save,pan,box_zoom,reset,wheel_zoom"
p = figure(title=file_name[3:5],
x_axis_type="datetime",
y_axis_type="linear",
plot_height = 400,
tools = TOOLS,
plot_width =980)
p.xaxis.axis_label = 'Time'
p.yaxis.axis_label = 'NMI reading'
p.line(x=df.index,
y=df.E,
line_color="purple")
# format datetime
p.add_tools(HoverTool(
tooltips=[
( 'Time', '@x{%F %T}'),
( 'Usage', '@y' ),
],
formatters={
'x': 'datetime',
},
))
show(p)
# testdf = file_map['NMIG1.csv']
# testdf = testdf.loc['2018-08-31':'2018-09-30']
testdf = file_map['NMIA3.csv']
testdf = testdf.loc['2018-01-31':'2018-05-31']
# pick data
days = 30
y = testdf['E'][:48 * days]
# series to df
signal = y.as_matrix(columns=None)
m = rpt.Pelt(model="rbf", jump=2, min_size=2).fit(signal)
result = m.predict(pen=6)
rpt.display(signal, [], result, figsize=(18, 6))
import ruptures as rpt
from numpy import sin, cos, pi, linspace
from numpy.random import randn
from scipy.signal import filtfilt, butter
from matplotlib.pyplot import plot, legend, show, grid, figure, savefig
testdf = file_map['NMIA1.csv']
# testdf = file_map['NMIA2.csv']
# testdf = file_map['NMIG1.csv']
# testdf = file_map['NMIS3.csv']
# testdf = testdf.loc['2018-05-01':'2018-06-01']
# testdf = testdf.loc['2018-03-31':'2018-04-30']
# testdf = testdf.loc['2018-01-01':'2018-02-17']
input_y = testdf.loc['2018-05-01':'2018-05-12']['E']
print(type(input_y)) # series
# Create an order 3 lowpass butterworth filter
b, a = butter(3, 0.05)
y = filtfilt(b, a, input_y)
plt.figure(figsize=(18,6))
plot(input_y.index, input_y, 'g', linewidth=1.75, alpha=0.75)
signal = y
print(type(y))
# 4 hour as min seg distance
# Assume business hour > 6 hr
m = rpt.Pelt(model="rbf", min_size=6).fit(signal)
result = m.predict(pen=6)
rpt.display(signal, [], result, figsize=(18, 6))
start_date = '2017-12-24'
end_date = '2017-12-29'
day_mask = nmi_with_holiday.LocalDate.between(start_date, end_date, inclusive=True)
matched_holiday = nmi_with_holiday[day_mask][nmi_with_holiday.Nmi == 'NMIA1']
matched_holiday
from scipy.signal import argrelmin
plt.figure(figsize=(18,6))
s = testdf.loc[start_date:end_date]['E']
s.plot()
# ma(10)
rolling = s.rolling(window=10)
ma10 = rolling.mean()
signal = ma10.interpolate('bfill') # Series
ma10_signal = signal.as_matrix(columns=None) # Ndarray
lows = argrelmin(ma10_signal)[0]
print(lows) # x
# plot
ma10.plot(color='red')
m = rpt.Pelt(model="rbf").fit(ma10_signal)
result = m.predict(pen=6)
# # Check point plot
rpt.display(signal, [], result, figsize=(18, 6))
Ma(10) don't have exact form, so can't get stationary point or local mimima
plt.figure(figsize=(18,6))
s = testdf.loc['2018-05-02':'2018-05-03']['E']
s.plot()
# ma
rolling = s.rolling(window=10)
ma10 = rolling.mean()
ma10.plot(color='red')
plt.figure(figsize=(18,6))
testdf.loc['2018-05-08':'2018-05-12']['E'].plot()
testdf = file_map['NMIA1.csv']
input_y = testdf.loc['2018-05-01':'2018-05-12']['E']
print(type(input_y)) # series
signal = input_y.as_matrix(columns=None) # series to df
print(type(signal))
# 4 hour as min seg distance. Assume business hour > 4hr
m = rpt.Pelt(model="rbf", min_size=8).fit(signal)
result = m.predict(pen=6)
rpt.display(signal, [], result, figsize=(18, 6))
change point algorithm pick middle point as start of business
since system has 30min lag, true starting point should be earlier than middle point
testdf.index.min(), testdf.index.max()
test_file_name = 'NMIA1.csv'
testdf = file_map[test_file_name]
input_y = testdf.loc['2017-10-01':'2018-09-30']['E']
signal = input_y.as_matrix(columns=None) # series to df
# 4 hour as min seg distance. Assume business hour > 4hr
m = rpt.Pelt(model="rbf", min_size=10, jump=5).fit(signal)
result = m.predict(pen=6)
# Don't call display on large dataset
# rpt.display(signal, [], result, figsize=(18, 6))
# Trick: To skip exception
result.remove(17520)
# Check
result[-1]
# nth rows
rdf = testdf.iloc[result]
rdf.head(6)
rdf.shape
# Check: datetime need to have both start and close
filted_biz_hr = rdf.groupby(rdf.index.date).filter(lambda x: x.count() == 2)
filted_biz_hr = filted_biz_hr.reset_index()
filted_biz_hr['T'] = filted_biz_hr.AESTTime.dt.date
filted_biz_hr.head(6)
# Current Meter's holiday
related_holiday = nmi_with_holiday[nmi_with_holiday['Nmi'] == test_file_name[:-4]] # NMIA1
related_holiday = related_holiday.reset_index()
related_holiday = related_holiday.rename(columns={'LocalDate':'T'})
related_holiday
# requirement for date field merge
filted_biz_hr['T'] = pd.to_datetime(filted_biz_hr['T'])
related_holiday['T'] = pd.to_datetime(related_holiday['T'])
# Test: left exclusing join, we exclude holiday dates from biz hour
# Jan 1st and 26th Dec are excluded
filted_biz_hr.iloc[156:].merge(related_holiday, on='T', how='left', indicator=True)\
.query('_merge == "left_only"')\
.drop(['_merge','T','Nmi'], 1).head(18)
# drop it
filted_biz_hr = filted_biz_hr.merge(related_holiday, on='T', how='left', indicator=True)\
.query('_merge == "left_only"').drop(['_merge','T','Nmi'], 1)
filted_biz_hr.shape
# data preparing
base_times = filted_biz_hr.AESTTime.dt.time
start_times = base_times[::2]
close_times = base_times[1::2]
print(start_times[0:3])
print(close_times[0:3])
# stringfy
start_times_in_str = list(map(lambda x: str(x), start_times.values))
close_times_in_str = list(map(lambda x: str(x), close_times.values))
start_times_in_str[:3], close_times_in_str[:3]
from datetime import timedelta
def avg_time_in_str(start_times_in_str):
return str(timedelta(seconds=sum(map(lambda f: int(f[0]) * 3600 + int(f[1]) * 60,
map(lambda f: f.split(':'),
start_times_in_str)
)
) / len(start_times_in_str)))
avg_time_in_str(start_times_in_str)
avg_time_in_str(close_times_in_str)
tdf = pd.read_csv(prefix_path + 'NMIA3.csv')
# handle missing value
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.interpolate.html
# nearest is better than linear and cubic, no abnormal value created
# tdf.E = tdf.E.interpolate('nearest')
tdf = tdf.set_index(pd.DatetimeIndex(tdf['AESTTime']))
tdf['2018-01-01':'2018-03-01'].plot(figsize=(18,6))
we actually process the file early on
file_map['NMIA3.csv']['2018-01-01':'2018-03-01'].plot(figsize=(18,6))
Peak detection includes local mimima.
we need to run this on moving average of 10 to get the coordinates of local minima https://gist.github.com/gcalmettes/1784428
Possible result: http://billauer.co.il/peakdet.html
Run python script that use EM to improve checkpoint detection algorithm
Kmean (k=2) Clustering on low and high usage value.
Classify into top and bottom group. (Ignoring middle point maybe)
Within a day, use last high's time to set business close time
use last low's time to set business start time
switching autoregressive HMM
notebook in: ECG_data/switching_autoregressive_HMM.ipynb
idea: https://stackoverflow.com/questions/11752727/pattern-recognition-in-time-series/11903770
work: I've convert python 2 code to 3, you need to download ECG data.
Result is quite promissing, it might have performance issue and corner cases.
Code is very simple. Need to be code reviewed and publish to PyPI for further use.

Modify moving z-socore algorithm for adaption

# Experiment 1: Peak detection includes local mimima.
# we need to run this on moving average of 10 to get the coordinates of local minima
# https://gist.github.com/gcalmettes/1784428
# Possible result:
# http://billauer.co.il/peakdet.html
# Experiment 2: Run python script that use EM to improve checkpoint detection algorithm
# Experiment 3: Clustering on low and high usage value.
# Classify top and bottom.
# Within a day, use last high's time to set business close time
# use last low's time to set business start time
aws sagemaker for model inference
import os
from datetime import timedelta
import pandas as pd
import numpy as np
import ruptures as rpt
# csv
prefix_path = 'interval_data/'
file_names = sorted(os.listdir(prefix_path))
# holiday
nmi = pd.read_csv('nmi_info.csv')
holidays = pd.read_csv('holidays.csv')
merged = nmi.merge(holidays, on='State')\
.drop(['OutInterval', 'State'], axis=1)\
.drop_duplicates()
nmi_with_holiday = merged.sort_values(['Nmi', 'LocalDate']).reset_index().drop('index',1)
file_map = {}
# Preprocessing
for name in file_names:
df = pd.read_csv(prefix_path + name)
# NaN
df.E = df.E.interpolate('nearest')
# index
df = df.set_index(pd.DatetimeIndex(df['AESTTime'])).drop(['AESTTime'], axis=1)
# aggregate by 30 mins interval only
df = df.resample('30T').mean()
assert df.shape[0] == 17520
# print('\n' + name + ' ====>')
# print(df.shape)
# print(df.head())
file_map[name] = df
def avg_time_in_str(start_times_in_str):
return str(timedelta(seconds=sum(map(lambda f: int(f[0]) * 3600 + int(f[1]) * 60,
map(lambda f: f.split(':'),
start_times_in_str)
)
) / len(start_times_in_str)))
for test_file_name in file_names:
testdf = file_map[test_file_name]
input_y = testdf['E']
signal = input_y.as_matrix(columns=None)
m = rpt.Pelt(model="rbf", min_size=10, jump=5).fit(signal)
result = m.predict(pen=6)
result.remove(17520)
# collect result: nth rows
rdf = testdf.iloc[result]
# date pair
filted_biz_hr = rdf.groupby(rdf.index.date).filter(lambda x: x.count() == 2)
filted_biz_hr = filted_biz_hr.reset_index()
filted_biz_hr['T'] = filted_biz_hr.AESTTime.dt.date
# related_holiday
related_holiday = nmi_with_holiday[nmi_with_holiday['Nmi'] == test_file_name[:-4]] # NMIA1
related_holiday = related_holiday.reset_index()
related_holiday = related_holiday.rename(columns={'LocalDate':'T'})
filted_biz_hr['T'] = pd.to_datetime(filted_biz_hr['T'])
related_holiday['T'] = pd.to_datetime(related_holiday['T'])
# drop it
filted_biz_hr = filted_biz_hr.merge(related_holiday, on='T', how='left', indicator=True)\
.query('_merge == "left_only"').drop(['_merge','T','Nmi'], 1)
# data preparing
base_times = filted_biz_hr.AESTTime.dt.time
start_times = base_times[::2]
close_times = base_times[1::2]
# stringfy
start_times_in_str = list(map(lambda x: str(x), start_times.values))
close_times_in_str = list(map(lambda x: str(x), close_times.values))
print('\n' + test_file_name)
print(avg_time_in_str(start_times_in_str))
print(avg_time_in_str(close_times_in_str))
https://docs.aws.amazon.com/sagemaker/latest/dg/getting-started-client-app.html https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-main.html
For mobile device calling Lambda https://docs.aws.amazon.com/lambda/latest/dg/with-android-example.html
Sample Notebook:
Training data store in s3
SageMaker consume s3 to build model
SageMaker deploy custom model as an end point
InvokeEndpoint from AWS Lambda
Other services involved could be ECS docker, API gateway and load balancer for model endpoint a/b testing. Sagemaker allows you to build and store model on s3, so rollback is quite easy.
I've deploy DeepAR from github repo. With sageMaker role
# create training job
create_training_params = \
{
"AlgorithmSpecification": {
"TrainingImage": image,
"TrainingInputMode": "File"
},
"RoleArn": role,
"OutputDataConfig": {
"S3OutputPath": output_location
},
"ResourceConfig": {
"InstanceCount": 2,
"InstanceType": "ml.c4.8xlarge",
"VolumeSizeInGB": 50
},
"TrainingJobName": job_name,
"HyperParameters": {
"k": "10",
"feature_dim": "784",
"mini_batch_size": "500",
"force_dense": "True"
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 60 * 60
},
"InputDataConfig": [
{
"ChannelName": "train",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": data_location,
"S3DataDistributionType": "FullyReplicated"
}
},
"CompressionType": "None",
"RecordWrapperType": "None"
}
]
}
sagemaker = boto3.client('sagemaker')
sagemaker.create_training_job(**create_training_params)
status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
# deploy
kmeans_predictor = kmeans.deploy(initial_instance_count=1,
instance_type='ml.m4.xlarge')
# create endpoint
create_endpoint_response = sagemaker.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name)
# testing endpoint
import json
payload = np2csv(train_set[0][30:31])
response = runtime.invoke_endpoint(EndpointName=endpoint_name,
ContentType='text/csv',
Body=payload)
result = json.loads(response['Body'].read().decode())
print(result) # OK